import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import java.io.BufferedWriter
import java.io.OutputStreamWriter
import java.net.URI
import java.text.SimpleDateFormat
import java.util.Date


object Main {

  def getDate(): String = {
    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
  }

  def makeMtd(rows: Long, cols: Long, nnz: Long): String = {
    s"""|{
        |    "data_type": "matrix",
        |    "value_type": "double",
        |    "rows": ${rows},
        |    "cols": ${cols},
        |    "nnz": ${nnz},
        |    "format": "text",
        |    "author": "SystemDS",
        |    "created": "${getDate()}"
        |}""".stripMargin
  }

  private val logger: Logger = LoggerFactory.getLogger(this.getClass)
  private var writer: BufferedWriter = null

  //在hdfs的目标位置新建一个文件，得到一个输出流
  @throws[Exception]
  def openHdfsFile(sc: SparkContext, path: String): Unit = {
    val fs = FileSystem.get(URI.create(path), sc.hadoopConfiguration)
    writer = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(path))))
    if (null != writer) {
      logger.info("[HdfsOperate]>> initialize writer succeed!")
    }
  }

  //往hdfs文件中写入数据
  def writeString(line: String): Unit = {
    try {
      writer.write(line)
    } catch {
      case e: Exception =>
        logger.error("[HdfsOperate]>> writer a line error:", e)
    }
  }

  //关闭hdfs输出流
  def closeHdfsFile(): Unit = {
    try {
      if (null != writer) {
        writer.close()
        logger.info("[HdfsOperate]>> closeHdfsFile close writer succeed!")
      }
      else {
        logger.error("[HdfsOperate]>> closeHdfsFile writer is null")
      }
    }
    catch {
      case e: Exception =>
        logger.error("[HdfsOperate]>> closeHdfsFile close hdfs error:" + e)
    }
  }

  //通过目录是否大于0来判断目录是否存在(消除对删除过的目录的误判),注意：只能在driver端使用，可以多线程来提速。
  def pathIsExist(sc: SparkContext, path: String): Boolean = {
    //取文件系统
    val filePath = new org.apache.hadoop.fs.Path(path)
    val fileSystem = filePath.getFileSystem(sc.hadoopConfiguration)

    // 获取该目录的大小，单位是字节
    val size = if (fileSystem.exists(filePath)) {
      fileSystem.getContentSummary(filePath).getLength
    } else {
      0
    }

    size > 0
  }

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    //      .setAppName("libsvm2ijv")
    //      .setMaster("local[1]")
    val sc = new SparkContext(sparkConf)

    if (args.length != 2) {
      println("parameters: libsvmPath ijvPath")
      return
    }

    val libsvmPath = args(0)
    val ijvPath = args(1)

    println("start")
    println(s"libsvmPath=${libsvmPath}")
    println(s"ijvPath=${ijvPath}")

    if (pathIsExist(sc, ijvPath)) {
      println("ijvPath already exists")
      return
    }


    val r1 = sc.textFile(libsvmPath)
      .persist(MEMORY_AND_DISK)
      .map(line => {
        val tokens = line.split(" ")
        val bi = tokens(0).toDouble
        val ai = tokens.drop(1).map(token => {
          val kv = token.split(":")
          (kv(0).toLong, kv(1).toDouble)
        })
        (bi, ai)
      })
      .zipWithIndex()
      .persist(MEMORY_AND_DISK)

    val data_num = r1.count()
    println(s"data_num=${data_num}")

    val a_feature_num = r1.map {
      case ((bi, ai), idx) => {
        ai.map(x => x._1).max
      }
    }.max()
    println(s"a_feature_num=${a_feature_num}")

    val a_nnz = r1.map {
      case ((bi, ai), row_index) => {
        ai.length.toLong
      }
    }.reduce((x, y) => x + y)
    println(s"a_nnz=${a_nnz}")

    val a_mtd = makeMtd(data_num, a_feature_num, a_nnz)
    println(s"a_mtd=\n ${a_mtd}")
    openHdfsFile(sc, ijvPath + "/a.ijv.mtd")
    writeString(a_mtd)
    closeHdfsFile()

    val b_mtd = makeMtd(data_num, 1, data_num)
    println(s"b_mtd=\n ${b_mtd}")
    openHdfsFile(sc, ijvPath + "/b.ijv.mtd")
    writeString(b_mtd)
    closeHdfsFile()


    val a_txt = r1.flatMap {
      case ((bi, ai), row_index) => {
        ai.map {
          case (col_index, value) => {
            String.format(s"${row_index + 1} ${col_index} ${value}")
          }
        }
      }
    }
    a_txt.saveAsTextFile(ijvPath + "/a.ijv")
    println("saved a_txt to " + ijvPath + "/a.ijv")

    val b_txt = r1.map {
      case ((bi, ai), row_index) => {
        String.format(s"${row_index + 1} 1 ${bi}")
      }
    }
    b_txt.saveAsTextFile(ijvPath + "/b.ijv")
    println("saved b_txt to " + ijvPath + "/b.ijv")

    println("end")

    sc.stop()
  }
}
